/**
 * Copyright 2009 Sergio Bossa
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */
package com.googlecode.actorom.impl.remote;

import com.googlecode.actorom.Actor;
import com.googlecode.actorom.Address;
import com.googlecode.actorom.CompletableFutureResult;
import com.googlecode.actorom.FutureResult;
import com.googlecode.actorom.impl.core.CoreFutureResult;
import com.googlecode.actorom.impl.remote.channel.RemoteChannelConnector;
import com.googlecode.actorom.impl.remote.channel.protocol.Header;
import com.googlecode.actorom.impl.remote.channel.protocol.IsActiveCommand;
import com.googlecode.actorom.impl.remote.channel.protocol.LinkCommand;
import com.googlecode.actorom.impl.remote.channel.protocol.SendCommand;
import com.googlecode.actorom.impl.remote.channel.protocol.SendWithFutureCommand;
import com.googlecode.actorom.impl.remote.channel.protocol.SwapCommand;
import com.googlecode.actorom.impl.remote.channel.protocol.UnlinkCommand;
import com.googlecode.actorom.impl.remote.channel.protocol.support.ProtocolUtils;
import com.googlecode.actorom.support.ActorExecutionException;
import com.googlecode.actorom.support.ActorInvocationException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.ThreadSafe;
import org.apache.commons.collections.map.ReferenceMap;

/**
 * @author Sergio Bossa
 */
@ThreadSafe
public class RemoteActorProxy implements Actor {

    private final Address address;
    private final RemoteChannelConnector connector;
    private final Map<String, CompletableFutureResult> futures;

    public RemoteActorProxy(Address address, RemoteChannelConnector connector) {
        this.address = address;
        this.connector = connector;
        this.futures = new ReferenceMap(ReferenceMap.HARD, ReferenceMap.WEAK);
        // Connect the connector to the remote server topology:
        this.connector.connect();
    }

    public synchronized void swap(Object handler) {
        if (isConnected() && isActiveActor()) {
            String id = ProtocolUtils.generateCorrelationId();
            Header header = new Header(id);
            boolean acknowledged = connector.send(new SwapCommand(header, address, handler));
            if (!acknowledged) {
                throw new ActorInvocationException("Error while swapping handler of remote actor: " + address);
            }
        } else {
            throw new ActorInvocationException("Error while swapping handler of remote actor: " + address);
        }
    }

    public synchronized void send(Object message) {
        if (isConnected() && isActiveActor()) {
            String id = ProtocolUtils.generateCorrelationId();
            Header header = new Header(id);
            boolean acknowledged = connector.send(new SendCommand(header, address, message));
            if (!acknowledged) {
                throw new ActorInvocationException("Error while sending message to remote actor: " + address);
            }
        } else {
            throw new ActorInvocationException("Error while sending message to remote actor: " + address);
        }
    }

    public synchronized FutureResult send(Object message, long timeout, TimeUnit unit) {
        if (isConnected() && isActiveActor()) {
            String id = ProtocolUtils.generateCorrelationId();
            CompletableFutureResult future = new CoreFutureResult(unit.toNanos(timeout));
            // Put the future prior to actually send the message!
            futures.put(id, future);
            //
            Header header = new Header(id);
            boolean acknowledged = connector.send(new SendWithFutureCommand(header, address, message, unit.toNanos(timeout)));
            if (acknowledged) {
                return future;
            } else {
                futures.remove(id);
                throw new ActorInvocationException("Error while sending message to remote actor: " + address);
            }
        } else {
            throw new ActorInvocationException("Error while sending message to remote actor: " + address);
        }
    }

    public synchronized void link(Actor actor) {
        if (isConnected() && isActiveActor()) {
            String id = ProtocolUtils.generateCorrelationId();
            Header header = new Header(id);
            boolean acknowledged = connector.send(new LinkCommand(header, address, actor.getAddress()));
            if (!acknowledged) {
                throw new ActorInvocationException("Error while linking from remote actor: " + address + " to: " + actor.getAddress());
            }
        } else {
            throw new ActorInvocationException("Error while linking from remote actor: " + address + " to: " + actor.getAddress());
        }
    }

    public synchronized void unlink(Actor actor) {
        if (isConnected() && isActiveActor()) {
            String id = ProtocolUtils.generateCorrelationId();
            Header header = new Header(id);
            boolean acknowledged = connector.send(new UnlinkCommand(header, address, actor.getAddress()));
            if (!acknowledged) {
                throw new ActorInvocationException("Error while unlinking source remote actor: " + address + " from: " + actor.getAddress());
            }
        } else {
            throw new ActorInvocationException("Error while unlinking source remote actor: " + address + " from: " + actor.getAddress());
        }
    }

    public synchronized boolean isActive() {
        return isConnected() && isActiveActor();
    }

    public synchronized Address getAddress() {
        return address;
    }

    public synchronized void completeFuture(String id, Object result, ActorExecutionException exception) {
        CompletableFutureResult future = futures.remove(id);
        if (future != null) {
            if (exception != null) {
                future.completeWithException(exception);
            } else {
                future.completeWithResult(result);
            }
        }
    }

    public synchronized void disconnect() {
        connector.disconnect();
    }

    private boolean isConnected() {
        return connector.isConnected();
    }

    private boolean isActiveActor() {
        String id = ProtocolUtils.generateCorrelationId();
        Header header = new Header(id);
        boolean acknowledged = connector.send(new IsActiveCommand(header, address));
        return acknowledged;
    }
}
